面向 PHP 开发者的 RabbitMQ 核心笔记,包含基础概念、常见代码示例、死信队列、幂等处理、常见错误等。
RabbitMQ 是一个 消息队列系统(Message Queue),用于系统之间的 异步通信、解耦、削峰填谷。
常见使用场景:
示意结构:
Producer -> Exchange -> Queue -> Consumer
发送消息的一方。
PHP 示例:
$channel->basic_publish($msg, $exchange, $routing_key);
负责 接收消息并路由到 Queue。
RabbitMQ 不允许直接发送到 Queue:
Producer -> Exchange -> Queue
Exchange 类型:
| 类型 | 说明 | | ------- | -------------------- | | direct | 精确匹配 routing key | | fanout | 广播 | | topic | 通配符匹配 | | headers | header 匹配 |
用于存储消息,消费者从队列中读取消息。
$channel->queue_declare(
'order.queue',
false,
true,
false,
false
);
参数说明:
| 参数 | 含义 | | ----------- | ---------- | | durable | 是否持久化 | | exclusive | 是否独占 | | auto_delete | 自动删除 |
Routing Key 用于 Exchange 和 Queue 的匹配。
例如:
routing_key = order.create
Exchange 必须绑定 Queue 才能发送消息。
$channel->queue_bind(
'order.queue',
'order.exchange',
'order.create'
);
结构:
Exchange
|
routing_key
|
Queue
安装库:
composer require php-amqplib/php-amqplib
发送消息:
use PhpAmqpLib\Message\AMQPMessage;
$msg = new AMQPMessage('hello world');
$channel->basic_publish(
$msg,
'order.exchange',
'order.create'
);
参数说明:
basic_publish(message, exchange, routing_key)
$callback = function ($msg) {
echo "receive: " . $msg->body . "\n";
};
$channel->basic_consume(
'order.queue',
'',
false,
true,
false,
false,
$callback
);
while ($channel->is_consuming()) {
$channel->wait();
}
RabbitMQ 有两种确认方式。
auto_ack = true
问题:
consumer 崩溃
消息可能丢失
$msg->ack();
处理流程:
consumer处理
|
成功
|
ack
失败:
nack / reject
为了防止 RabbitMQ 重启导致消息丢失,需要开启持久化。
$channel->exchange_declare(
'order.exchange',
'direct',
false,
true,
false
);
$channel->queue_declare(
'order.queue',
false,
true,
false,
false
);
$msg = new AMQPMessage(
$data,
['delivery_mode' => 2]
);
RabbitMQ 在某些情况下可能会 重复投递消息:
解决方法:
例如:
order_id
message_id
mq_message
字段示例:
| 字段 | 含义 | | ---------- | ----------- | | id | 主键 | | message_id | 消息唯一 ID | | status | 处理状态 |
消费前:
INSERT INTO mq_message(message_id)
如果插入失败说明:
消息已经处理过
死信队列用于处理:
1️⃣ 消息被拒绝
basic.reject
basic.nack
2️⃣ 消息过期
TTL
3️⃣ 队列满
队列参数:
x-dead-letter-exchange
x-dead-letter-routing-key
示例:
use PhpAmqpLib\Wire\AMQPTable;
$args = new AMQPTable([
'x-dead-letter-exchange' => 'order.dlx.exchange'
]);
queue1
|
消费失败
|
dead letter
|
DLX
|
queue2
原因:
use PhpAmqpLib\Wire\AMQPTable;
没有引入类。
示例错误:
inequivalent arg 'x-dead-letter-exchange'
原因:
queue 参数与已存在队列不一致
解决:
需要做到三点:
解决方案:
常见方案:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
// 1. 建立连接
$connection = new AMQPStreamConnection(
'rabbitmq',
5672,
'guest',
'guest'
);
$channel = $connection->channel();
$channel->exchange_declare(
'order.exchange',
'direct',
false,
true,
false
);
// 2. 声明 durable queue
$queueName = 'order.queue';
$channel->queue_declare(
$queueName,
false, // passive
true, // durable
false, // exclusive
false, // auto_delete
false,
new AMQPTable([
'x-dead-letter-exchange' => 'order.dlx.exchange',
'x-dead-letter-routing-key' => 'order.dlq'
])
);
// 3. 构造消息(persistent)
$data = [
'order_id' => uniqid(),
'amount' => 100,
'time' => date('Y-m-d H:i:s')
];
$msg = new AMQPMessage(
json_encode($data, JSON_UNESCAPED_UNICODE),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$channel->queue_bind('order.queue', 'order.exchange', 'order');
// 4. 发送消息
$channel->basic_publish(
$msg,
'order.exchange',
'order'
);
echo " [x] Send order: {$data['order_id']}\n";
// 5. 关闭资源
$channel->close();
$connection->close();
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection(
'rabbitmq',
5672,
'guest',
'guest'
);
$channel = $connection->channel();
$args = new AMQPTable([
'x-dead-letter-exchange' => 'order.dlx.exchange',
'x-dead-letter-routing-key'=> 'order.dlq',
]);
$channel->queue_declare(
'order.queue',
false,
true,
false,
false,
false,
$args
);
$queueName = 'order.queue';
// 一次只处理一条
$channel->basic_qos(null, 1, null);
// ================== 新增部分开始 ==================
$running = true;
// 监听 SIGTERM / SIGINT
pcntl_signal(SIGTERM, function () use (&$running) {
echo " [!] SIGTERM received, stop consuming...\n";
$running = false;
});
pcntl_signal(SIGINT, function () use (&$running) {
echo " [!] SIGINT received, stop consuming...\n";
$running = false;
});
// ================== 新增部分结束 ==================
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
$data = json_decode($msg->body, true);
$orderId = $data['order_id'];
echo " [>] Processing order {$orderId}\n";
// 模拟耗时任务
// sleep(2);
echo " [✓] Done {$orderId}\n";
// 业务成功后 ACK
$msg->ack();
};
$channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
$callback
);
// ================== 核心循环修改 ==================
while ($running && $channel->is_consuming()) {
pcntl_signal_dispatch(); // 关键
try {
$channel->wait(null, false, 1);
} catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
// 空闲超时,忽略
}
}
// 退出前关闭资源
echo " [*] Graceful shutdown\n";
$channel->close();
$connection->close();